package com.flink.day07_flink_cdc;


import com.alibaba.fastjson.JSONObject;
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import java.util.List;
import java.util.Properties;

/**
 * @description: TODO Flink-CDC 自定义反序列化
 * @author: HaoWu
 * @create: 2021年05月24日
 */
public class Flink03_Flink_CDC_CustomerSchema {
    public static void main(String[] args) throws Exception {
        //1.创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //2.创建 Flink-Mysql-CDC 的Source
        Properties properties = new Properties();
        //initial (default): Performs an initial snapshot on the monitored database tables upon first startup, and continue to read the latest binlog.
        //latest-offset: Never to perform snapshot on the monitored database tables upon first startup, just read from the end of the binlog which means only have the changes since the connector was started.
        //timestamp: Never to perform snapshot on the monitored database tables upon first startup, and directly read binlog from the specified timestamp. The consumer will traverse the binlog from the beginning and ignore change events whose timestamp is smaller than the specified timestamp.
        //specific-offset: Never to perform snapshot on the monitored database tables upon first startup, and directly read binlog from the specified offset
        properties.setProperty("debezium.snapshot.mode", "initial");
        DebeziumSourceFunction<JSONObject> mysqlSource = MySQLSource
                .<JSONObject>builder()
                .hostname("hadoop102")
                .port(3306)
                .username("root")
                .password("root")
                .databaseList("gmall-flink-200821")
                .tableList("gmall-flink-200821.z_user_info") //可选配置项,如果不指定该参数,则会读取上一个配置下的所有表的数据,注意：指定的时候需要使用"db.table"的方式
                .debeziumProperties(properties)
                .deserializer(new CdcDwdDeserializationSchema()) //自定义类解析cdc数据格式
                .build();

        //3.使用CDC Source从MySQL读取数据
        DataStreamSource<JSONObject> mysqlDS = env.addSource(mysqlSource);

        //4.打印数据
        mysqlDS.print();

        //5.执行任务
        env.execute();


    }
}


/**
 * 自定义类，解析一下cdc的格式，支持所有操作
 */
class CdcDwdDeserializationSchema implements DebeziumDeserializationSchema<JSONObject> {
    private static final long serialVersionUID = -3168848963265670603L;

    public CdcDwdDeserializationSchema() {
    }

    @Override
    public void deserialize(SourceRecord record, Collector<JSONObject> out) throws Exception {
        Struct dataRecord  =  (Struct)record.value();

        Struct afterStruct = dataRecord.getStruct("after");
        Struct beforeStruct = dataRecord.getStruct("before");
        /*
          todo 1，同时存在 beforeStruct 跟 afterStruct数据的话，就代表是update的数据
               2,只存在 beforeStruct 就是delete数据
               3，只存在 afterStruct数据 就是insert数据
         */

        JSONObject logJson = new JSONObject();

        String canal_type = "";
        List<Field> fieldsList = null;
        if(afterStruct !=null && beforeStruct !=null){
            System.out.println("这是修改数据");
            canal_type = "update";
            fieldsList = afterStruct.schema().fields();
            //todo 字段与值
            for (Field field : fieldsList) {
                String fieldName = field.name();
                Object fieldValue = afterStruct.get(fieldName);
//            System.out.println("*****fieldName=" + fieldName+",fieldValue="+fieldValue);
                logJson.put(fieldName,fieldValue);
            }
        }else if (afterStruct !=null){
            System.out.println( "这是新增数据");

            canal_type = "insert";
            fieldsList = afterStruct.schema().fields();
            //todo 字段与值
            for (Field field : fieldsList) {
                String fieldName = field.name();
                Object fieldValue = afterStruct.get(fieldName);
//            System.out.println("*****fieldName=" + fieldName+",fieldValue="+fieldValue);
                logJson.put(fieldName,fieldValue);
            }
        }else if (beforeStruct !=null){
            System.out.println( "这是删除数据");
            canal_type = "detele";
            fieldsList = beforeStruct.schema().fields();
            //todo 字段与值
            for (Field field : fieldsList) {
                String fieldName = field.name();
                Object fieldValue = beforeStruct.get(fieldName);
//            System.out.println("*****fieldName=" + fieldName+",fieldValue="+fieldValue);
                logJson.put(fieldName,fieldValue);
            }
        }else {
            System.out.println("一脸蒙蔽了");
        }
        //todo 拿到databases table信息
        Struct source = dataRecord.getStruct("source");
        Object db = source.get("db");
        Object table = source.get("table");
        Object ts_ms = source.get("ts_ms");

        logJson.put("canal_database",db);
        logJson.put("canal_database",table);
        logJson.put("canal_ts",ts_ms);
        logJson.put("canal_type",canal_type);

        //todo 拿到topic
        String topic = record.topic();
        System.out.println("topic = " + topic);

        //todo 主键字段
        Struct pk = (Struct)record.key();
        List<Field> pkFieldList = pk.schema().fields();
        int partitionerNum = 0 ;
        for (Field field : pkFieldList) {
            Object pkValue= pk.get(field.name());
            partitionerNum += pkValue.hashCode();

        }
        int hash = Math.abs(partitionerNum) % 3;
        logJson.put("pk_hashcode",hash);
        out.collect(logJson);
    }
    @Override
    public TypeInformation<JSONObject> getProducedType() {
        return BasicTypeInfo.of(JSONObject.class);
    }
}
